-
Notifications
You must be signed in to change notification settings - Fork 8
feat: add graceful shutdown with signal handling #174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c3cb5ef to
b86f35d
Compare
b86f35d to
3ac8323
Compare
e1ccbb6 to
db01ee6
Compare
021733c to
4abc42f
Compare
cdb62ca to
7512c1e
Compare
|
|
||
| server.add_insecure_port("[::]:50001") | ||
| print("Started listening at localhost:50001") | ||
| server.add_insecure_port(f"[::]:{options.port}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change was necessary to support end to end testing concurrently.
20c13d2 to
3cfa041
Compare
3cfa041 to
490ff0a
Compare
…isconnect detection
490ff0a to
57a2fc2
Compare
| try: | ||
| print(f"Terminating agent PID {proc.pid}") | ||
| proc.terminate() | ||
| proc.wait(timeout=shutdown_grace_period) | ||
| except subprocess.TimeoutExpired: | ||
| # Process didn't die within timeout | ||
| print(f"killing agent PID {proc.pid}") | ||
| proc.kill() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good
src/isolate/server/server.py
Outdated
| def initiate_shutdown(self, grace_period: float | None = None) -> None: | ||
| if self._shutting_down: | ||
| return | ||
| self._shutting_down = True | ||
| if grace_period is None: | ||
| grace_period = SHUTDOWN_GRACE_PERIOD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accepting grace_period as None reads as if it was the "infinite wait" option, but in reality you are assigning a default of SHUTDOWN_GRACE_PERIOD.
I would change grace_period to directly default to the SHUTDOWN_GRACE_PERIOD in the function params
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, I've removed it from the function parameters. I realizes it was leftover from tests in a previous revision, when the terminate -> kill timing was managed within initiate_shutdown. Now this function only outputs a useful log with this value.
SHUTDOWN_GRACE_PERIOD is now passed through to the LocalPythonGRPC class during agent allocation, and it controls the terminate -> kill timing there instead.
| # Collect all active agents from running tasks | ||
| shutdown_threads = [] | ||
| for task in self.background_tasks.values(): | ||
| if task.agent is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and if it is None you should probably cancel it directly
src/isolate/server/server.py
Outdated
|
|
||
| if self._server: | ||
| print("Stopping gRPC server") | ||
| self._server.stop(grace=0.1) # Short grace period for server shutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think bringing the server into the servicer context has no real benefit, you can call initiate_shutdown and call server.stop() right after in the SingleTaskInterceptor context
src/isolate/server/server.py
Outdated
| def register_signal_handlers(self, server: grpc.Server) -> None: | ||
| """Set up signal handlers for graceful shutdown""" | ||
| self._server = server | ||
|
|
||
| def signal_handler(signum, _): | ||
| """Handle SIGTERM and SIGINT by gracefully shutting down server""" | ||
| print(f"Received signal {signum}, shutting down server") | ||
| self.initiate_shutdown() | ||
|
|
||
| signal.signal(signal.SIGTERM, signal_handler) | ||
| signal.signal(signal.SIGINT, signal_handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then you would need to move this function out of this context too, just to a context that has access to both the server and the servicer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can just do right before the main function a
def signal_shutdown(signum, _):
print(f"Received signal {signum}, shutting down server")
servicer.initiate_shutdown()
server.stop()
def main():
# ...
signal.signal(signal.SIGTERM, signal_shutdown)
signal.signal(signal.SIGINT, signal_shutdown)| # Process should be terminated after terminate_proc() returns | ||
| assert proc.poll() is not None, "Process should be terminated by SIGTERM" | ||
|
|
||
| def test_force_terminate(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good one
tests/test_shutdown.py
Outdated
| def function_obj(): | ||
| import time | ||
|
|
||
| time.sleep(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be an infinite loop instead? to make sure it is just not taking 10 secs to finish the test but it is actually closing because of the disconnect
382aa46 to
c132ef5
Compare
…isconnect detection
c132ef5 to
ccc811d
Compare
| def create_run_request(func, stream_logs=True): | ||
| """Convert a Python function into a BoundFunction request for stub.Run().""" | ||
| bound_function = functools.partial(func) | ||
| serialized_function = to_serialized_object(bound_function, method="cloudpickle") | ||
|
|
||
| env_def = EnvironmentDefinition() | ||
| env_def.kind = "local" | ||
|
|
||
| request = BoundFunction() | ||
| request.function.CopyFrom(serialized_function) | ||
| request.environments.append(env_def) | ||
| request.stream_logs = stream_logs | ||
|
|
||
| return request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmlad Here's a convenience function for passing normal functions to an IsolateServicer.Run call. FYI.
| def register_signal_handlers(servicer: IsolateServicer, server: grpc.Server) -> None: | ||
| def handle_signal(signum, frame): | ||
| print(f"Received signal {signum}, initiating shutdown...") | ||
| servicer.initiate_shutdown() | ||
| server.stop(grace=0.1) | ||
|
|
||
| signal.signal(signal.SIGINT, handle_signal) | ||
| signal.signal(signal.SIGTERM, handle_signal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chamini2 Is this what you're suggesting here #174 (comment)?
Allows for graceful shutdown of the IsolateServicer and associated agents.
Adds the
ISOLATE_SHUTDOWN_GRACE_PERIODenvironment variable. Defaults to 60m.Prior Behavior
terminationinterceptor is called which stops the server, but does not terminate agent tasks. Agent tasks exit when they exit, and then main returns.Proposed Behavior
servicer.initiate_shutdown()calledagent.Terminate()for all servicer's active background_tasksagent.TerminateSIGTERMs agent process pid by terminatingself._bound_context, andSIGKILLafter configured grace periodView client termination behavior before and after change.
shutdownbehavior.mov
Grace period handling is not complete, because agent signal handling can not be propagated completely. When the agent subprocess receives a termination signal, the signal can only be received by the main execution thread of the python application which in this case is
src/isolate/connections/grpc/agent.py:run_agentwhich is a running gRPC server. Agent gRPC calls are handled by worker threads, and theRunhandler calls the provided function within the context of the handler thread. This means any provided functions being executed within an isolate agent are not run by a thread that is capable of receiving signals.This means, within the current multithreaded agent approach - in order to provide graceful shutdown capabilities to isolated python functions, they must have an application layer signal such as a passed context. This requires an function running within an Isolate agent to be context aware if it wants to implement graceful shutdown.
Alternatively modify the agent gRPC server to use AsyncIO instead of the current multithreaded approach. This would not have a performance hit, since the current approach is configured for a single worker thread. Isolate functions can register signal handlers as they would normally without needing to be context aware.
Here's a rough sketch of what that would look like, with a test confirming the signal propagation through to execute_function does work.